Moleculer 框架也支援 Middlewares ,類似插入模組套件的概念。 Middlewares 是一個帶有 Hook 與包裝函數的 Object ,它允許包裝 Action 處理程序、事件處理程序、Broker 方法及 Hook 生命週期事件。
範例:
awesome.middleware.js
module.exports = {
    name: "Awesome",
    localAction(next, action) {
        return function(ctx) {
            console.log(`My middleware is called before the `${ctx.action.name}` action executed.`);
            return next(ctx);
        }
    }
};
moleculer.config.js
module.exports = {
    middlewares: true,
};
Hooks 中有一些包裝函數,你可以利用它包裝原始處理程序,然後返回一個新的函數。包裝 Hooks 的第一個參數會是 next 。
範例:以 localAction 為例,包裝本地處理程序
const MyDoSomethingMiddleware = {
    localAction(next, action) {
        // 假如功能被啟用則包裝它,否則直接返回原始處理程序
        if (action.myFunc) {
            // 包裝處理程序
            return function(ctx) {
                doSomethingBeforeHandler(ctx);
                return next(ctx)
                    .then(res => {
                        doSomethingAfterHandler(res);
                        // 返回原始結果
                        return res;
                    })
                    .catch(err => {
                        doSomethingAfterHandlerIfFailed(err);
                        // 拋出錯誤
                        throw err;
                    });
            }
        }
        return next;
    }
};
範例:參數驗證 middleware
在
localAction中的next會是一個原始的處理程序,或者是經過包裝的處理程序。然而 middleware 也應該要返回一個原始的處理程序,或者是新的包裝處理程序。所以你可以發現範例中的 middleware 會確認action.params是否存在,如果存在才返回一個驗證模組的包裝處理程序。假如action.params不存在則返回原始的處理程序。
const MyValidator = {
    localAction(next, action) {
        // 假如 `action.params` 存在,則包裝參數驗證器
        if (_.isObject(action.params)) {
            return ctx => {
                this.validate(action.params, ctx.params);
                return next(ctx);
            };
        }
        return next;
    }
};
範例:快取 middleware
除了返回
next以外,你也可以返回快取。例如當發現了請求資料中包含了快取,你可以直接取得快取內容來替代next。注意next返回的是一個Promise。
const MyCacher = {
    localAction(next, action) {
        return async function cacherMiddleware(ctx) {
            const cacheKey = this.getCacheKey(action.name, ctx.params, action.cache.keys);
            const content = await this.get(cacheKey);
            // 如果快取存在,直接返回快取內容
            if (content != null) {
                ctx.cachedResult = true;
                return content;
            }
            // 呼叫 next
            const result = await next(ctx);
            // 儲存快取內容
            this.set(cacheKey, result);
            return result;
        }.bind(this);
    }
};
Middleware 函數可用於在 ServiceBroker 或 Service 類別加入一個新的功能。
moleculer.config.js
module.exports = {
    middlewares: [
        {
            // Broker 建立後
            created(broker) {
                // 在 broker 加上客製的 allCall 功能函數,用來呼叫所有可用的節點
                broker.allCall = function (action, params, opts = {}) {
					// 取得所有節點 ID
                    const nodeIDs = this.registry.getNodeList({
                        onlyAvailable: true
                    }).map(node => node.id);
                    // 呼叫所有節點
                    return Promise.all(
                        nodeIDs.map(nodeID => broker.call(
                            action, params, Object.assign({ nodeID }, opts)
                        ))
                    );
                };
            }
        }
    ]
};
使用客製的 allCall 功能函數,呼叫所有節點的健康狀況:
const res = await broker.allCall("$node.health");
官方提供非常多的 Hooks ,由於鐵人賽時間有限,僅簡單列出所有的 Hooks 使用方式。
my.middleware.js
module.export = {
    name: "MyMiddleware",
	// 本地 Action
    localAction(next, action) {
        return function (ctx) {
            // 變更 context 屬性或其它內容
            return next(ctx)
                .then(res => {
                    // 變更響應內容
                    return res;
                })
                .catch(err => {
                    // 錯誤處理或拋出異常
                    throw err;
                });
        };
    },
	// 遠端 Action
    remoteAction(next, action) {
        return function (ctx) {
            // 變更 context 屬性或其它內容
            return next(ctx)
                .then(res => {
                    // 變更響應內容
                    return res;
                })
                .catch(err => {
                    // 錯誤處理或拋出異常
                    throw err;
                });
        };
    },
	// 本地事件
    localEvent(next, event) {
        return (ctx) => {
            return next(ctx);
        };
    },
	// 本地方法
    localMethod(next, method) {
        return (...args) => {
            console.log(`The '${method.name}' method is called in '${method.service.fullName}' service.`, args);
            return next(...args);
        };
    },
	// broker.createService 的 Hook
    createService(next) {
        return function (schema, schemaMods) {
            console.log("The 'createService' is called.");
            return next(schema, schemaMods);
        };
    },
	// broker.destroyService 的 Hook
    destroyService(next) {
        return function (service) {
            console.log("The 'destroyService' is called.");
            return next(service);
        };
    },
	// broker.call 的 Hook
    call(next) {
        return function (actionName, params, opts) {
            console.log("The 'call' is called.", actionName);
            return next(actionName, params, opts).then(res => {
                console.log("Response:", res);
                return res;
            });
        };
    },
	// broker.mcall 的 Hook
    mcall(next) {
        return function () {
            console.log("The 'mcall' is called.");
            return next(...arguments).then(res => {
                console.log("Response:", res);
                return res;
            });
        };
    },
	// broker.emit 的 Hook
    emit(next) {
        return function (eventName, payload, opts) {
            console.log("The 'emit' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// broker.broadcast 的 Hook
    broadcast(next) {
        return function (eventName, payload, opts) {
            console.log("The 'broadcast' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// broker.broadcastLocal 的 Hook
    broadcastLocal(next) {
        return function (eventName, payload, opts) {
            console.log("The 'broadcastLocal' is called.", eventName);
            return next(eventName, payload, opts);
        };
    },
	// 服務建立後 (同步)
    serviceCreated(service) {
        console.log("Service created", service.fullName);
    },
	// 服務啟動前 (異步)
    serviceStarting(service) {
        console.log("Service is starting", service.fullName);
    },
	// 服務啟動後 (異步)
    serviceStarted(service) {
        console.log("Service started", service.fullName);
    },
	// 服務停止前 (異步)
    serviceStopping(service) {
        console.log("Service is stopping", service.fullName);
    },
	// 服務停止後 (異步)
    serviceStopped(service) {
        console.log("Service stopped", service.fullName);
    },
	// 服務註冊
    registerLocalService(next) {
        return (service) => {
            console.log("Registering a local service", service.name);
            return next(service);
        };
    },
	// 服務建立前,會在 Merged 完成後
    serviceCreating(service, schema) {
        // 變更 schema
        schema.myProp = "John";
    },
	// Transit 發送前
    transitPublish(next) {
        return (packet) => {
            return next(packet);
        };
    },
	// Transit 接收解析前
    transitMessageHandler(next) {
        return (cmd, packet) => {
            return next(cmd, packet);
        };
    },
	// Transporter 發送前
    transporterSend(next) {
        return (topic, data, meta) => {
            // 變更 data 內容,注意 data 是個 Buffer
            return next(topic, data, meta);
        };
    },
	// Transporter 接收後
    transporterReceive(next) {
        return (cmd, data, s) => {
            // 變更 data 內容,注意 data 是個 Buffer
            return next(cmd, data, s);
        };
    },
	// 建立 Log (同步)
    newLogEntry(type, args, bindings) {
        // 變更 `args` 內容
    },
	// 建立 Broker (異步)
    created(broker) {
        console.log("Broker created");
    },
	// 啟動 Broker 前 (異步)
    starting(broker) {
        console.log("Broker is starting");
    },
	// 啟動 Broker 後 (異步)
    started(broker) {
        console.log("Broker started");
    },
	// 停止 Broker 前 (異步)
    stopping(broker) {
        console.log("Broker is stopping");
    },
	// 停止 Broker 後 (異步)
    stopped(broker) {
        console.log("Broker stopped");
    },
};
更多詳細的使用方法請參考官方手冊:
https://moleculer.services/docs/0.14/middlewares.html#Hooks
Moleculer 的許多功能都有內部的 middlewares ,它會在 broker 建立時被自動載入。然而你也可以在 Broker 選項設定 internalMiddlewares: false 來關閉它,但在這種情況下你必須在 Broker 的 middlewares: [] 選項中,明確指定所需的 middlewares 。
清單:
Class name Type Description
| 名稱 | 類型 | 說明 | 
|---|---|---|
| ActionHook | Optional | Action hooks 處理程序 | 
| Validator | Optional | 參數驗證器 | 
| Bulkhead | Optional | Bulkhead 功能 | 
| Cacher | Optional | 快取 | 
| ContextTracker | Optional | Context tracker 功能 | 
| CircuitBreaker | Optional | Circuit Breaker 功能 | 
| Timeout | Always | Timeout 功能 | 
| Retry | Always | Retry 功能 | 
| Fallback | Always | Fallback 功能 | 
| ErrorHandler | Always | 錯誤處理 | 
| Tracing | Optional | 追蹤功能 | 
| Metrics | Optional | Metrics 功能 | 
| Debounce | Optional | Debounce 功能 | 
| Throttle | Optional | Throttle 功能 | 
| Transmit.Encryption | Optional | 加密傳輸 | 
| Transmit.Compression | Optional | 壓縮傳輸 | 
| Debugging.TransitLogger | Optional | Transit Logger | 
| Debugging.ActionLogger | Optional | Action Logger | 
範例:
const { Bulkhead, Retry } = require("moleculer").Middlewares;
使用 Node.js 內建的 crypto 加密函式庫,加入 AES middleware 來做加密,以增加通訊傳輸的安全性。
moleculer.config.js
const crypto = require("crypto");
const { Middlewares } = require("moleculer");
const initVector = crypto.randomBytes(16);
module.exports = {
    middlewares: [
        Middlewares.Transmit.Encryption(
            "secret-password", "aes-256-cbc", initVector
        )
    ]
};
使用壓縮 middleware 可以減少 Transporter 傳輸的資料大小,內建是使用 Node.js 的 zlib 函式庫來處理,可以使用 deflate 、 deflateRaw 或 gzip 。
moleculer.config.js
const { Middlewares } = require("moleculer");
module.exports = {
    middlewares: [
        Middlewares.Transmit.Compression("deflate")
    ]
};
Transit logger middleware 讓我們很容易的追蹤服務與服務間的訊息交換。
moleculer.config.js
const { Middlewares } = require("moleculer");
module.exports = {
  middlewares: [
    Middlewares.Debugging.TransitLogger({
      logPacketData: false,
      folder: null,
      colors: {
        send: "magenta",
        receive: "blue"
      },
      packetFilter: ["HEARTBEAT"]
    })
  ]
};
參數:
| 名稱 | 類型 | 預設值 | 說明 | 
|---|---|---|---|
logger | 
<Object> | <Function> | null | 
Logger 類別 | 
logLevel | 
<String> | info | 
Log 等級 | 
logPacketData | 
<Boolean> | false | 
Logs packet 參數 | 
folder | 
<Object> | null | 
輸出目錄 | 
extension | 
<String> | .json | 
Log 副檔名 | 
color.receive | 
<String> | grey | 
接收顏色[2] | 
color.send | 
<String> | grey | 
送出顏色[2] | 
packetFilter | 
<String[]> | HEARTBEAT | 
要跳過的packet[3] | 
Action Logger middleware 是用來追蹤服務 Actions 是如何執行的。
moleculer.config.js
const { Middlewares } = require("moleculer");
module.exports = {
  middlewares: [
    Middlewares.Debugging.ActionLogger({
      logParams: true,
      logResponse: true,
      folder: null,
      colors: {
        send: "magenta",
        receive: "blue"
      },
      whitelist: ["**"]
    })
  ]
};
| 名稱 | 類型 | 預設值 | 說明 | 
|---|---|---|---|
logger | 
<Object> | <Function> | null | 
Logger 類別 | 
logLevel | 
<String> | info | 
Log 等級 | 
logParams | 
<Boolean> | false | 
紀錄請求參數 | 
logMeta | 
<Boolean> | false | 
紀錄 meta 參數 | 
folder | 
<Object> | null | 
輸出目錄 | 
extension | 
<String> | .json | 
Log 副檔名 | 
color.request | 
<String> | yellow | 
請求顏色[2] | 
color.response | 
<String> | cyan | 
響應顏色[2] | 
colors.error | 
<String> | red | 
錯誤顏色[2] | 
whitelist | 
<String[]> | ["**"] | 
要記錄的 Actions 名稱,可使用正則表達式或萬用字元 | 
節流可以用來降低事件的觸發頻率。由於它會以固定的速率去觸發監聽器,因此監聽器會忽略事件的部分資訊。它使用與 Lodash 的 _.throttle 相同的方法[4] 。
my.service.js
module.exports = {
    name: "my",
    events: {
        "config.changed": {
            throttle: 3000,
            // 3 秒內不會再次呼叫
            handler(ctx) { /* ... */}
        }
    }
};
防抖不同於節流,當事件不斷的觸發時它不會呼叫,直到事件停止觸發後等待一段時間才會呼叫處理程序。它使用與 Lodash 的 _.debounce 相同的方法[5] 。
my.service.js
module.exports = {
    name: "my",
    events: {
        "config.changed": {
            debounce: 5000,
            // Handler will be invoked when events are not received in 5 seconds.
            handler(ctx) { /* ... */}
        }
    }
};
關於節流與防抖的差異,官方手冊推薦可以看 David Corbacho 的文章[6]。
客製化函數可以直接在 Middlewares 擴充,然後就可以像內建的 middlewares 函數一樣可以在 middlewares[] 直接加入使用。
moleculer.config.js
const { Middlewares } = require("moleculer");
// 建立一個客製化擴充 middleware
Middlewares.MyCustom = {
    created(broker) {
        broker.logger.info("My custom middleware is created!");
    }
};
module.exports = {
    logger: true,
    middlewares: [
        // 直接使用客製化 middleware
        "MyCustom"
    ]
};

Fig. 1. Moleculer middleware
[1] Middlewares, https://moleculer.services/docs/0.14/middlewares.html
[2] Chalk colors, https://github.com/chalk/chalk#colors
[3] Protocol 4.0 (rev. 1), https://github.com/moleculer-framework/protocol/blob/master/4.0/PROTOCOL.md
[4] Lodash throttle, https://lodash.com/docs/4.17.15#throttle
[5] Lodash debounce, https://lodash.com/docs/4.17.15#debounce
[6] David Corbacho, Debouncing and Throttling Explained Through Examples, https://css-tricks.com/debouncing-throttling-explained-examples/